package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.bean.UserRegisterBean;
import com.atguigu.gmall.realtime.util.ClickHouseUtil;
import com.atguigu.gmall.realtime.util.DateFormatUtil;
import com.atguigu.gmall.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author caodan
 * @version 1.0
 * @date 2022-10-11 10:28
 * 用户域用户注册各窗口轻度聚合
 */
public class DwsUserUserRegisterWindow {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 检查点相关设置

        // 从kafka中获取数据
        String topicName = "dwd_user_register";
        String groupId = "dws_user_register_Window";
        FlinkKafkaConsumer<String> flinkKafkaConsumer = KafkaUtil.getFlinkKafkaConsumer(topicName, groupId);

        // 转换为流
        DataStreamSource<String> jsonStrDS = env.addSource(flinkKafkaConsumer);
        // 类型转换
        SingleOutputStreamOperator<JSONObject> jsonObjDS = jsonStrDS.map(JSON::parseObject);

//        jsonObjDS.print(">>>");
        // 设置水位线
        SingleOutputStreamOperator<JSONObject> withWaterMarkDS = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts") * 1000L;
                            }
                        })
        );

        // 聚合计算
        SingleOutputStreamOperator<UserRegisterBean> aggregateDS = withWaterMarkDS
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(
                new AggregateFunction<JSONObject, Long, Long>() {
                    @Override
                    public Long createAccumulator() {
                        return 0L;
                    }

                    @Override
                    public Long add(JSONObject jsonObj, Long accumulator) {
                        accumulator += 1;
                        return accumulator;
                    }

                    @Override
                    public Long getResult(Long accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Long merge(Long a, Long b) {
                        return null;
                    }
                },
                new AllWindowFunction<Long, UserRegisterBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Long> values, Collector<UserRegisterBean> out) throws Exception {
                        for (Long value : values) {
                            String stt = DateFormatUtil.toYmdHms(window.getStart());
                            String edt = DateFormatUtil.toYmdHms(window.getEnd());
                            UserRegisterBean userRegisterBean = new UserRegisterBean(
                                    stt,
                                    edt,
                                    value,
                                    System.currentTimeMillis()
                            );
                            out.collect(userRegisterBean);
                        }
                    }
                }
        );
//        aggregateDS.print();
        // 将结果写入clickhouse 中
        aggregateDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_user_user_register_window values(?,?,?,?)"));


        env.execute();
    }
}
